#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# This script is meant for developers of DataFusion -- it is runnable
# from the standard DataFusion development environment and uses cargo,
# etc and orchestrates gathering data and run the benchmark binary in
# different configurations.


# Exit on error
set -e

# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

# Execute command and also print it, for debugging purposes
debug_run() {
    set -x
    "$@"
    set +x
}

# Set Defaults
COMMAND=
BENCHMARK=all
DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}
VIRTUAL_ENV=${VIRTUAL_ENV:-$SCRIPT_DIR/venv}

usage() {
    echo "
Orchestrates running benchmarks against DataFusion checkouts

Usage:
$0 data [benchmark]
$0 run [benchmark] [query]
$0 compare <branch1> <branch2>
$0 compare_detail <branch1> <branch2>
$0 venv

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Examples:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Create the datasets for all benchmarks in $DATA_DIR
./bench.sh data

# Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion
DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Commands
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
data:            Generates or downloads data needed for benchmarking
run:             Runs the named benchmark
compare:         Compares fastest results from benchmark runs
compare_detail:  Compares minimum, average (±stddev), and maximum results from benchmark runs
venv:            Creates new venv (unless already exists) and installs compare's requirements into it

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Benchmarks
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Run all of the following benchmarks
all(default): Data/Run/Compare for all benchmarks

# TPC-H Benchmarks
tpch:                   TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join
tpch_csv:               TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single csv file per table, hash join
tpch_mem:               TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10:                 TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join
tpch_csv10:             TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single csv file per table, hash join
tpch_mem10:             TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory

# Extended TPC-H Benchmarks
sort_tpch:              Benchmark of sorting speed for end-to-end sort queries on TPC-H dataset (SF=1)
sort_tpch10:            Benchmark of sorting speed for end-to-end sort queries on TPC-H dataset (SF=10)
topk_tpch:              Benchmark of top-k (sorting with limit) queries on TPC-H dataset (SF=1)
external_aggr:          External aggregation benchmark on TPC-H dataset (SF=1)

# ClickBench Benchmarks
clickbench_1:           ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against partitioned (100 files) parquet
clickbench_pushdown:    ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
clickbench_extended:    ClickBench \"inspired\" queries against a single parquet (DataFusion specific)

# H2O.ai Benchmarks (Group By, Join, Window)
h2o_small:                      h2oai benchmark with small dataset (1e7 rows) for groupby,  default file format is csv
h2o_medium:                     h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
h2o_big:                        h2oai benchmark with large dataset (1e9 rows) for groupby,  default file format is csv
h2o_small_join:                 h2oai benchmark with small dataset (1e7 rows) for join,  default file format is csv
h2o_medium_join:                h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
h2o_big_join:                   h2oai benchmark with large dataset (1e9 rows) for join,  default file format is csv
h2o_small_window:               Extended h2oai benchmark with small dataset (1e7 rows) for window,  default file format is csv
h2o_medium_window:              Extended h2oai benchmark with medium dataset (1e8 rows) for window, default file format is csv
h2o_big_window:                 Extended h2oai benchmark with large dataset (1e9 rows) for window,  default file format is csv
h2o_small_parquet:              h2oai benchmark with small dataset (1e7 rows) for groupby,  file format is parquet
h2o_medium_parquet:             h2oai benchmark with medium dataset (1e8 rows) for groupby, file format is parquet
h2o_big_parquet:                h2oai benchmark with large dataset (1e9 rows) for groupby,  file format is parquet
h2o_small_join_parquet:         h2oai benchmark with small dataset (1e7 rows) for join,  file format is parquet
h2o_medium_join_parquet:        h2oai benchmark with medium dataset (1e8 rows) for join, file format is parquet
h2o_big_join_parquet:           h2oai benchmark with large dataset (1e9 rows) for join,  file format is parquet
h2o_small_window_parquet:       Extended h2oai benchmark with small dataset (1e7 rows) for window,  file format is parquet
h2o_medium_window_parquet:      Extended h2oai benchmark with medium dataset (1e8 rows) for window, file format is parquet
h2o_big_window_parquet:         Extended h2oai benchmark with large dataset (1e9 rows) for window,  file format is parquet

# Join Order Benchmark (IMDB)
imdb:                   Join Order Benchmark (JOB) using the IMDB dataset converted to parquet

# Micro-Benchmarks (specific operators and features)
cancellation:           How long cancelling a query takes
nlj:                    Benchmark for simple nested loop joins, testing various join scenarios
hj:                     Benchmark for simple hash joins, testing various join scenarios
compile_profile:        Compile and execute TPC-H across selected Cargo profiles, reporting timing and binary size


━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Supported Configuration (Environment Variables)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
DATA_DIR            directory to store datasets
CARGO_COMMAND       command that runs the benchmark binary
DATAFUSION_DIR      directory to use (default $DATAFUSION_DIR)
RESULTS_NAME        folder where the benchmark files are stored
PREFER_HASH_JOIN    Prefer hash join algorithm (default true)
VENV_PATH           Python venv to use for compare and venv commands (default ./venv, override by <your-venv>/bin/activate)
DATAFUSION_*        Set the given datafusion configuration
"
    exit 1
}

# https://stackoverflow.com/questions/192249/how-do-i-parse-command-line-arguments-in-bash
POSITIONAL_ARGS=()

while [[ $# -gt 0 ]]; do
    case $1 in
        # -e|--extension)
        #   EXTENSION="$2"
        #   shift # past argument
        #   shift # past value
        #   ;;
        -h|--help)
            shift # past argument
            usage
            ;;
        -*)
            echo "Unknown option $1"
            exit 1
            ;;
        *)
            POSITIONAL_ARGS+=("$1") # save positional arg
            shift # past argument
            ;;
    esac
done

set -- "${POSITIONAL_ARGS[@]}" # restore positional parameters
COMMAND=${1:-"${COMMAND}"}
ARG2=$2
ARG3=$3

# Do what is requested
main() {
    # Command Dispatch
    case "$COMMAND" in
        data)
            BENCHMARK=${ARG2:-"${BENCHMARK}"}
            echo "***************************"
            echo "DataFusion Benchmark Runner and Data Generator"
            echo "COMMAND: ${COMMAND}"
            echo "BENCHMARK: ${BENCHMARK}"
            echo "DATA_DIR: ${DATA_DIR}"
            echo "CARGO_COMMAND: ${CARGO_COMMAND}"
            echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
            echo "***************************"
            case "$BENCHMARK" in
                all)
                    data_tpch "1"
                    data_tpch "10"
                    data_h2o "SMALL"
                    data_h2o "MEDIUM"
                    data_h2o "BIG"
                    data_h2o_join "SMALL"
                    data_h2o_join "MEDIUM"
                    data_h2o_join "BIG"
                    data_clickbench_1
                    data_clickbench_partitioned
                    data_imdb
                    # nlj uses range() function, no data generation needed
                    ;;
                tpch)
                    data_tpch "1"
                    ;;
                tpch_mem)
                    # same data as for tpch
                    data_tpch "1"
                    ;;
                tpch10)
                    data_tpch "10"
                    ;;
                tpch_mem10)
                    # same data as for tpch10
                    data_tpch "10"
                    ;;
                clickbench_1)
                    data_clickbench_1
                    ;;
                clickbench_partitioned)
                    data_clickbench_partitioned
                    ;;
                clickbench_pushdown)
                    data_clickbench_partitioned # same data as clickbench_partitioned
                    ;;
                clickbench_extended)
                    data_clickbench_1
                    ;;
                imdb)
                    data_imdb
                    ;;
                h2o_small)
                    data_h2o "SMALL" "CSV"
                    ;;
                h2o_medium)
                    data_h2o "MEDIUM" "CSV"
                    ;;
                h2o_big)
                    data_h2o "BIG" "CSV"
                    ;;
                h2o_small_join)
                    data_h2o_join "SMALL" "CSV"
                    ;;
                h2o_medium_join)
                    data_h2o_join "MEDIUM" "CSV"
                    ;;
                h2o_big_join)
                    data_h2o_join "BIG" "CSV"
                    ;;
                # h2o window benchmark uses the same data as the h2o join
                h2o_small_window)
                    data_h2o_join "SMALL" "CSV"
                    ;;
                h2o_medium_window)
                    data_h2o_join "MEDIUM" "CSV"
                    ;;
                h2o_big_window)
                    data_h2o_join "BIG" "CSV"
                    ;;
                h2o_small_parquet)
                    data_h2o "SMALL" "PARQUET"
                    ;;
                h2o_medium_parquet)
                    data_h2o "MEDIUM" "PARQUET"
                    ;;
                h2o_big_parquet)
                    data_h2o "BIG" "PARQUET"
                    ;;
                h2o_small_join_parquet)
                    data_h2o_join "SMALL" "PARQUET"
                    ;;
                h2o_medium_join_parquet)
                    data_h2o_join "MEDIUM" "PARQUET"
                    ;;
                h2o_big_join_parquet)
                    data_h2o_join "BIG" "PARQUET"
                    ;;
                # h2o window benchmark uses the same data as the h2o join
                h2o_small_window_parquet)
                    data_h2o_join "SMALL" "PARQUET"
                    ;;
                h2o_medium_window_parquet)
                    data_h2o_join "MEDIUM" "PARQUET"
                    ;;
                h2o_big_window_parquet)
                    data_h2o_join "BIG" "PARQUET"
                    ;;
                external_aggr)
                    # same data as for tpch
                    data_tpch "1"
                    ;;
                sort_tpch)
                    # same data as for tpch
                    data_tpch "1"
                    ;;
                sort_tpch10)
                    # same data as for tpch10
                    data_tpch "10"
                    ;;
                topk_tpch)
                    # same data as for tpch
                    data_tpch "1"
                    ;;
                nlj)
                    # nlj uses range() function, no data generation needed
                    echo "NLJ benchmark does not require data generation"
                    ;;
                hj)
                    # hj uses range() function, no data generation needed
                    echo "HJ benchmark does not require data generation"
                    ;;
                compile_profile)
                    data_tpch "1"
                    ;;
                *)
                    echo "Error: unknown benchmark '$BENCHMARK' for data generation"
                    usage
                    ;;
            esac
            ;;
        run)
            # Parse positional parameters
            BENCHMARK=${ARG2:-"${BENCHMARK}"}
            EXTRA_ARGS=("${POSITIONAL_ARGS[@]:2}")
            PROFILE_ARGS=()
            QUERY=""
            QUERY_ARG=""
            if [ "$BENCHMARK" = "compile_profile" ]; then
                PROFILE_ARGS=("${EXTRA_ARGS[@]}")
            else
                QUERY=${EXTRA_ARGS[0]}
                if [ -n "$QUERY" ]; then
                    QUERY_ARG="--query ${QUERY}"
                fi
            fi
            BRANCH_NAME=$(cd "${DATAFUSION_DIR}" && git rev-parse --abbrev-ref HEAD)
            BRANCH_NAME=${BRANCH_NAME//\//_} # mind blowing syntax to replace / with _
            RESULTS_NAME=${RESULTS_NAME:-"${BRANCH_NAME}"}
            RESULTS_DIR=${RESULTS_DIR:-"$SCRIPT_DIR/results/$RESULTS_NAME"}

            echo "***************************"
            echo "DataFusion Benchmark Script"
            echo "COMMAND: ${COMMAND}"
            echo "BENCHMARK: ${BENCHMARK}"
            if [ "$BENCHMARK" = "compile_profile" ]; then
                echo "PROFILES: ${PROFILE_ARGS[*]:-All}"
            else
                echo "QUERY: ${QUERY:-All}"
            fi
            echo "DATAFUSION_DIR: ${DATAFUSION_DIR}"
            echo "BRANCH_NAME: ${BRANCH_NAME}"
            echo "DATA_DIR: ${DATA_DIR}"
            echo "RESULTS_DIR: ${RESULTS_DIR}"
            echo "CARGO_COMMAND: ${CARGO_COMMAND}"
            echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
            echo "***************************"

            # navigate to the appropriate directory
            pushd "${DATAFUSION_DIR}/benchmarks" > /dev/null
            mkdir -p "${RESULTS_DIR}"
            mkdir -p "${DATA_DIR}"
            case "$BENCHMARK" in
                all)
                    run_tpch "1" "parquet"
                    run_tpch "1" "csv"
                    run_tpch_mem "1"
                    run_tpch "10" "parquet"
                    run_tpch "10" "csv"
                    run_tpch_mem "10"
                    run_cancellation
                    run_clickbench_1
                    run_clickbench_partitioned
                    run_clickbench_pushdown
                    run_clickbench_extended
                    run_h2o "SMALL" "PARQUET" "groupby"
                    run_h2o "MEDIUM" "PARQUET" "groupby"
                    run_h2o "BIG" "PARQUET" "groupby"
                    run_h2o_join "SMALL" "PARQUET" "join"
                    run_h2o_join "MEDIUM" "PARQUET" "join"
                    run_h2o_join "BIG" "PARQUET" "join"
                    run_imdb
                    run_external_aggr
                    run_nlj
                    run_hj
                    ;;
                tpch)
                    run_tpch "1" "parquet"
                    ;;
                tpch_csv)
                    run_tpch "1" "csv"
                    ;;
                tpch_mem)
                    run_tpch_mem "1"
                    ;;
                tpch10)
                    run_tpch "10" "parquet"
                    ;;
                tpch_csv10)
                    run_tpch "10" "csv"
                    ;;
                tpch_mem10)
                    run_tpch_mem "10"
                    ;;
                cancellation)
                    run_cancellation
                    ;;
                clickbench_1)
                    run_clickbench_1
                    ;;
                clickbench_partitioned)
                    run_clickbench_partitioned
                    ;;
                clickbench_pushdown)
                    run_clickbench_pushdown
                    ;;
                clickbench_extended)
                    run_clickbench_extended
                    ;;
                imdb)
                    run_imdb
                    ;;
                h2o_small)
                    run_h2o "SMALL" "CSV" "groupby"
                    ;;
                h2o_medium)
                    run_h2o "MEDIUM" "CSV" "groupby"
                    ;;
                h2o_big)
                    run_h2o "BIG" "CSV" "groupby"
                    ;;
                h2o_small_join)
                    run_h2o_join "SMALL" "CSV" "join"
                    ;;
                h2o_medium_join)
                    run_h2o_join "MEDIUM" "CSV" "join"
                    ;;
                h2o_big_join)
                    run_h2o_join "BIG" "CSV" "join"
                    ;;
                h2o_small_window)
                    run_h2o_window "SMALL" "CSV" "window"
                    ;;
                h2o_medium_window)
                    run_h2o_window "MEDIUM" "CSV" "window"
                    ;;
                h2o_big_window) 
                    run_h2o_window "BIG" "CSV" "window"
                    ;;
                h2o_small_parquet)
                    run_h2o "SMALL" "PARQUET"
                    ;;
                h2o_medium_parquet)
                    run_h2o "MEDIUM" "PARQUET"
                    ;;
                h2o_big_parquet)
                    run_h2o "BIG" "PARQUET"
                    ;;
                h2o_small_join_parquet)
                    run_h2o_join "SMALL" "PARQUET"
                    ;;
                h2o_medium_join_parquet)
                    run_h2o_join "MEDIUM" "PARQUET"
                    ;;
                h2o_big_join_parquet)
                    run_h2o_join "BIG" "PARQUET"
                    ;;
                # h2o window benchmark uses the same data as the h2o join
                h2o_small_window_parquet)
                    run_h2o_window "SMALL" "PARQUET"
                    ;;
                h2o_medium_window_parquet)
                    run_h2o_window "MEDIUM" "PARQUET"
                    ;;
                h2o_big_window_parquet)
                    run_h2o_window "BIG" "PARQUET"
                    ;;
                external_aggr)
                    run_external_aggr
                    ;;
                sort_tpch)
                    run_sort_tpch "1"
                    ;;
                sort_tpch10)
                    run_sort_tpch "10"
                    ;;
                topk_tpch)
                    run_topk_tpch
                    ;;
                nlj)
                    run_nlj
                    ;;
                hj)
                    run_hj
                    ;;
                compile_profile)
                    run_compile_profile "${PROFILE_ARGS[@]}"
                    ;;
                *)
                    echo "Error: unknown benchmark '$BENCHMARK' for run"
                    usage
                    ;;
            esac
            popd > /dev/null
            echo "Done"
            ;;
        compare)
            compare_benchmarks "$ARG2" "$ARG3"
            ;;
        compare_detail)
            compare_benchmarks "$ARG2" "$ARG3" "--detailed"
            ;;
        venv)
            setup_venv
            ;;
        "")
            usage
            ;;
        *)
            echo "Error: unknown command: $COMMAND"
            usage
            ;;
    esac
}



# Creates TPCH data at a certain scale factor, if it doesn't already
# exist
#
# call like: data_tpch($scale_factor)
#
# Creates data in $DATA_DIR/tpch_sf1 for scale factor 1
# Creates data in $DATA_DIR/tpch_sf10 for scale factor 10
# etc
data_tpch() {
    SCALE_FACTOR=$1
    if [ -z "$SCALE_FACTOR" ] ; then
        echo "Internal error: Scale factor not specified"
        exit 1
    fi

    TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
    echo "Creating tpch dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..."

    # Ensure the target data directory exists
    mkdir -p "${TPCH_DIR}"

    # Create 'tbl' (CSV format) data into $DATA_DIR if it does not already exist
    FILE="${TPCH_DIR}/supplier.tbl"
    if test -f "${FILE}"; then
        echo " tbl files exist ($FILE exists)."
    else
        echo " creating tbl files with tpch_dbgen..."
        docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}"
    fi

    # Copy expected answers into the ./data/answers directory if it does not already exist
    FILE="${TPCH_DIR}/answers/q1.out"
    if test -f "${FILE}"; then
        echo " Expected answers exist (${FILE} exists)."
    else
        echo " Copying answers to ${TPCH_DIR}/answers"
        mkdir -p "${TPCH_DIR}/answers"
        docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main  -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
    fi

    # Create 'parquet' files from tbl
    FILE="${TPCH_DIR}/supplier"
    if test -d "${FILE}"; then
        echo " parquet files exist ($FILE exists)."
    else
        echo " creating parquet files using benchmark binary ..."
        pushd "${SCRIPT_DIR}" > /dev/null
        $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
        popd > /dev/null
    fi

    # Create 'csv' files from tbl
    FILE="${TPCH_DIR}/csv/supplier"
    if test -d "${FILE}"; then
        echo " csv files exist ($FILE exists)."
    else
        echo " creating csv files using benchmark binary ..."
        pushd "${SCRIPT_DIR}" > /dev/null
        $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}/csv" --format csv
        popd > /dev/null
    fi
}

# Runs the tpch benchmark
run_tpch() {
    SCALE_FACTOR=$1
    if [ -z "$SCALE_FACTOR" ] ; then
        echo "Internal error: Scale factor not specified"
        exit 1
    fi
    TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

    RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running tpch benchmark..."

    FORMAT=$2
    debug_run $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the tpch in memory
run_tpch_mem() {
    SCALE_FACTOR=$1
    if [ -z "$SCALE_FACTOR" ] ; then
        echo "Internal error: Scale factor not specified"
        exit 1
    fi
    TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

    RESULTS_FILE="${RESULTS_DIR}/tpch_mem_sf${SCALE_FACTOR}.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running tpch_mem benchmark..."
    # -m means in memory
    debug_run $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the compile profile benchmark helper
run_compile_profile() {
    local profiles=("$@")
    local runner="${SCRIPT_DIR}/compile_profile.py"
    local data_path="${DATA_DIR}/tpch_sf1"

    echo "Running compile profile benchmark..."
    local cmd=(python3 "${runner}" --data "${data_path}")
    if [ ${#profiles[@]} -gt 0 ]; then
        cmd+=(--profiles "${profiles[@]}")
    fi
    debug_run "${cmd[@]}"
}

# Runs the cancellation benchmark
run_cancellation() {
    RESULTS_FILE="${RESULTS_DIR}/cancellation.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running cancellation benchmark..."
    debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
}


# Downloads the single file hits.parquet ClickBench datasets from
# https://github.com/ClickHouse/ClickBench/tree/main#data-loading
#
# Creates data in $DATA_DIR/hits.parquet
data_clickbench_1() {
    pushd "${DATA_DIR}" > /dev/null

    # Avoid downloading if it already exists and is the right size
    OUTPUT_SIZE=$(wc -c hits.parquet  2>/dev/null  | awk '{print $1}' || true)
    echo -n "Checking hits.parquet..."
    if test "${OUTPUT_SIZE}" = "14779976446"; then
        echo -n "... found ${OUTPUT_SIZE} bytes ..."
    else
        URL="https://datasets.clickhouse.com/hits_compatible/hits.parquet"
        echo -n "... downloading ${URL} (14GB) ... "
        wget --continue ${URL}
    fi
    echo " Done"
    popd > /dev/null
}

# Downloads the 100 file partitioned ClickBench datasets from
# https://github.com/ClickHouse/ClickBench/tree/main#data-loading
#
# Creates data in $DATA_DIR/hits_partitioned
data_clickbench_partitioned() {
    MAX_CONCURRENT_DOWNLOADS=10

    mkdir -p "${DATA_DIR}/hits_partitioned"
    pushd "${DATA_DIR}/hits_partitioned" > /dev/null

    echo -n "Checking hits_partitioned..."
    OUTPUT_SIZE=$(wc -c -- * 2>/dev/null | tail -n 1  | awk '{print $1}' || true)
    if test "${OUTPUT_SIZE}" = "14737666736"; then
        echo -n "... found ${OUTPUT_SIZE} bytes ..."
    else
        echo -n " downloading with ${MAX_CONCURRENT_DOWNLOADS} parallel workers"
        seq 0 99 | xargs -P${MAX_CONCURRENT_DOWNLOADS} -I{} bash -c 'wget -q --continue https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{}.parquet && echo -n "."'
    fi

    echo " Done"
    popd > /dev/null
}


# Runs the clickbench benchmark with a single large parquet file
run_clickbench_1() {
    RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running clickbench (1 file) benchmark..."
    debug_run $CARGO_COMMAND --bin dfbench -- clickbench  --iterations 5 --path "${DATA_DIR}/hits.parquet"  --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

 # Runs the clickbench benchmark with the partitioned parquet dataset (100 files)
run_clickbench_partitioned() {
    RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running clickbench (partitioned, 100 files) benchmark..."
    debug_run $CARGO_COMMAND --bin dfbench -- clickbench  --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}


 # Runs the clickbench benchmark with the partitioned parquet files and filter_pushdown enabled
run_clickbench_pushdown() {
    RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
    debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
}


# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
    RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running clickbench (1 file) extended benchmark..."
    debug_run $CARGO_COMMAND --bin dfbench -- clickbench  --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors)
# https://event.cwi.nl/da/job/imdb.tgz
data_imdb() {
    local imdb_dir="${DATA_DIR}/imdb"
    local imdb_temp_gz="${imdb_dir}/imdb.tgz"
    local imdb_url="https://event.cwi.nl/da/job/imdb.tgz"

   # imdb has 21 files, we just separate them into 3 groups for better readability
    local first_required_files=(
        "aka_name.parquet"
        "aka_title.parquet"
        "cast_info.parquet"
        "char_name.parquet"
        "comp_cast_type.parquet"
        "company_name.parquet"
        "company_type.parquet"
    )

    local second_required_files=(
        "complete_cast.parquet"
        "info_type.parquet"
        "keyword.parquet"
        "kind_type.parquet"
        "link_type.parquet"
        "movie_companies.parquet"
        "movie_info.parquet"
    )

    local third_required_files=(
        "movie_info_idx.parquet"
        "movie_keyword.parquet"
        "movie_link.parquet"
        "name.parquet"
        "person_info.parquet"
        "role_type.parquet"
        "title.parquet"
    )

    # Combine the three arrays into one
    local required_files=("${first_required_files[@]}" "${second_required_files[@]}" "${third_required_files[@]}")
    local convert_needed=false

    # Create directory if it doesn't exist
    mkdir -p "${imdb_dir}"

    # Check if required files exist
    for file in "${required_files[@]}"; do
        if [ ! -f "${imdb_dir}/${file}" ]; then
            convert_needed=true
            break
        fi
    done

    if [ "$convert_needed" = true ]; then
        # Expected size of the dataset
        expected_size="1263193115" # 1.18 GB

        echo -n "Looking for imdb.tgz... "
        if [ -f "${imdb_temp_gz}" ]; then
            echo "found"
            echo -n "Checking size... "
            OUTPUT_SIZE=$(wc -c "${imdb_temp_gz}" 2>/dev/null | awk '{print $1}' || true)

            #Checking the size of the existing file
            if [ "${OUTPUT_SIZE}" = "${expected_size}" ]; then
                # Existing file is of the expected size, no need for download
                echo "OK ${OUTPUT_SIZE}"
            else
                # Existing file is partially installed, remove it and initiate a new download
                echo "MISMATCH"
                echo "Size less than expected: ${OUTPUT_SIZE} found, ${expected_size} required"
                echo "Downloading IMDB dataset..."
                rm -f "${imdb_temp_gz}"

                # Download the dataset
                curl -o "${imdb_temp_gz}" "${imdb_url}"

                # Size check of the installed file
                DOWNLOADED_SIZE=$(wc -c "${imdb_temp_gz}" | awk '{print $1}')
                if [ "${DOWNLOADED_SIZE}" != "${expected_size}" ]; then
                    echo "Error: Download size mismatch"
                    echo "Expected: ${expected_size}"
                    echo "Got: ${DOWNLADED_SIZE}"
                    echo "Please re-initiate the download"
                    return 1
                fi
            fi
        else
            # No existing file found, initiate a new download
            echo "not found"
            echo "Downloading IMDB dataset ${expected_size} expected)..."
            # Download the dataset
            curl -o "${imdb_temp_gz}" "${imdb_url}"
        fi

        # Extract the dataset
        tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}"
        $CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet
        echo "IMDB dataset downloaded and extracted."

    else
        echo "IMDB dataset already exists and contains required parquet files."
    fi
}

# Runs the imdb benchmark
run_imdb() {
    IMDB_DIR="${DATA_DIR}/imdb"

    RESULTS_FILE="${RESULTS_DIR}/imdb.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running imdb benchmark..."
    debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
}

data_h2o() {
    # Default values for size and data format
    SIZE=${1:-"SMALL"}
    DATA_FORMAT=${2:-"CSV"}

    # Function to compare Python versions
    version_ge() {
        [ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ]
    }

    export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1

    # Find the highest available Python version (3.10 or higher)
    REQUIRED_VERSION="3.10"
    PYTHON_CMD=$(command -v python3 || true)

    if [ -n "$PYTHON_CMD" ]; then
        PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
        if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
            echo "Found Python version $PYTHON_VERSION, which is suitable."
        else
            echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required."
            PYTHON_CMD=""
        fi
    fi

   # Search for suitable Python versions if the default is unsuitable
   if [ -z "$PYTHON_CMD" ]; then
       # Loop through all available Python3 commands on the system
       for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do
           if command -v "$CMD" &> /dev/null; then
               PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
               if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
                   PYTHON_CMD="$CMD"
                   echo "Found suitable Python version: $PYTHON_VERSION ($CMD)"
                   break
               fi
           fi
       done
   fi

    # If no suitable Python version found, exit with an error
    if [ -z "$PYTHON_CMD" ]; then
        echo "Python 3.10 or higher is required. Please install it."
        return 1
    fi

    echo "Using Python command: $PYTHON_CMD"

    # Install falsa and other dependencies
    echo "Installing falsa..."

    # Set virtual environment directory
    VIRTUAL_ENV="${PWD}/venv"

    # Create a virtual environment using the detected Python command
    $PYTHON_CMD -m venv "$VIRTUAL_ENV"

    # Activate the virtual environment and install dependencies
    source "$VIRTUAL_ENV/bin/activate"

    # Ensure 'falsa' is installed (avoid unnecessary reinstall)
    pip install --quiet --upgrade falsa

    # Create directory if it doesn't exist
    H2O_DIR="${DATA_DIR}/h2o"
    mkdir -p "${H2O_DIR}"

    # Generate h2o test data
    echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
    falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"

    # Deactivate virtual environment after completion
    deactivate
}

data_h2o_join() {
    # Default values for size and data format
    SIZE=${1:-"SMALL"}
    DATA_FORMAT=${2:-"CSV"}

    # Function to compare Python versions
    version_ge() {
        [ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ]
    }

    export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1

    # Find the highest available Python version (3.10 or higher)
    REQUIRED_VERSION="3.10"
    PYTHON_CMD=$(command -v python3 || true)

    if [ -n "$PYTHON_CMD" ]; then
        PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
        if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
            echo "Found Python version $PYTHON_VERSION, which is suitable."
        else
            echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required."
            PYTHON_CMD=""
        fi
    fi

   # Search for suitable Python versions if the default is unsuitable
   if [ -z "$PYTHON_CMD" ]; then
       # Loop through all available Python3 commands on the system
       for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do
           if command -v "$CMD" &> /dev/null; then
               PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
               if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
                   PYTHON_CMD="$CMD"
                   echo "Found suitable Python version: $PYTHON_VERSION ($CMD)"
                   break
               fi
           fi
       done
   fi

    # If no suitable Python version found, exit with an error
    if [ -z "$PYTHON_CMD" ]; then
        echo "Python 3.10 or higher is required. Please install it."
        return 1
    fi

    echo "Using Python command: $PYTHON_CMD"

    # Install falsa and other dependencies
    echo "Installing falsa..."

    # Set virtual environment directory
    VIRTUAL_ENV="${PWD}/venv"

    # Create a virtual environment using the detected Python command
    $PYTHON_CMD -m venv "$VIRTUAL_ENV"

    # Activate the virtual environment and install dependencies
    source "$VIRTUAL_ENV/bin/activate"

    # Ensure 'falsa' is installed (avoid unnecessary reinstall)
    pip install --quiet --upgrade falsa

    # Create directory if it doesn't exist
    H2O_DIR="${DATA_DIR}/h2o"
    mkdir -p "${H2O_DIR}"

    # Generate h2o test data
    echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
    falsa join --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"

    # Deactivate virtual environment after completion
    deactivate
}

# Runner for h2o groupby benchmark
run_h2o() {
    # Default values for size and data format
    SIZE=${1:-"SMALL"}
    DATA_FORMAT=${2:-"CSV"}
    DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
    RUN_Type=${3:-"groupby"}

    # Data directory and results file path
    H2O_DIR="${DATA_DIR}/h2o"
    RESULTS_FILE="${RESULTS_DIR}/h2o.json"

    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running h2o groupby benchmark..."

    # Set the file name based on the size
    case "$SIZE" in
        "SMALL")
            FILE_NAME="G1_1e7_1e7_100_0.${DATA_FORMAT}"  # For small dataset
            ;;
        "MEDIUM")
            FILE_NAME="G1_1e8_1e8_100_0.${DATA_FORMAT}"  # For medium dataset
            ;;
        "BIG")
            FILE_NAME="G1_1e9_1e9_100_0.${DATA_FORMAT}"  # For big dataset
            ;;
        *)
            echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
            return 1
            ;;
    esac

     # Set the query file name based on the RUN_Type
    QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

    # Run the benchmark using the dynamically constructed file path and query file
    debug_run $CARGO_COMMAND --bin dfbench -- h2o \
        --iterations 3 \
        --path "${H2O_DIR}/${FILE_NAME}" \
        --queries-path "${QUERY_FILE}" \
        -o "${RESULTS_FILE}" \
         ${QUERY_ARG}
}

# Utility function to run h2o join/window benchmark
h2o_runner() {
    # Default values for size and data format
    SIZE=${1:-"SMALL"}
    DATA_FORMAT=${2:-"CSV"}
    DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
    RUN_Type=${3:-"join"}

    # Data directory and results file path
    H2O_DIR="${DATA_DIR}/h2o"
    RESULTS_FILE="${RESULTS_DIR}/h2o_${RUN_Type}.json"

    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running h2o ${RUN_Type} benchmark..."

    # Set the file name based on the size
    case "$SIZE" in
        "SMALL")
            X_TABLE_FILE_NAME="J1_1e7_NA_0.${DATA_FORMAT}"
            SMALL_TABLE_FILE_NAME="J1_1e7_1e1_0.${DATA_FORMAT}"
            MEDIUM_TABLE_FILE_NAME="J1_1e7_1e4_0.${DATA_FORMAT}"
            LARGE_TABLE_FILE_NAME="J1_1e7_1e7_NA.${DATA_FORMAT}"
            ;;
        "MEDIUM")
            X_TABLE_FILE_NAME="J1_1e8_NA_0.${DATA_FORMAT}"
            SMALL_TABLE_FILE_NAME="J1_1e8_1e2_0.${DATA_FORMAT}"
            MEDIUM_TABLE_FILE_NAME="J1_1e8_1e5_0.${DATA_FORMAT}"
            LARGE_TABLE_FILE_NAME="J1_1e8_1e8_NA.${DATA_FORMAT}"
            ;;
        "BIG")
            X_TABLE_FILE_NAME="J1_1e9_NA_0.${DATA_FORMAT}"
            SMALL_TABLE_FILE_NAME="J1_1e9_1e3_0.${DATA_FORMAT}"
            MEDIUM_TABLE_FILE_NAME="J1_1e9_1e6_0.${DATA_FORMAT}"
            LARGE_TABLE_FILE_NAME="J1_1e9_1e9_NA.${DATA_FORMAT}"
            ;;
        *)
            echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
            return 1
            ;;
    esac

    # Set the query file name based on the RUN_Type
    QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

    debug_run $CARGO_COMMAND --bin dfbench -- h2o \
        --iterations 3 \
        --join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \
        --queries-path "${QUERY_FILE}" \
        -o "${RESULTS_FILE}" \
         ${QUERY_ARG}
}

# Runners for h2o join benchmark
run_h2o_join() {
    h2o_runner "$1" "$2" "join"
}

# Runners for h2o join benchmark
run_h2o_window() {
    h2o_runner "$1" "$2" "window"
}

# Runs the external aggregation benchmark
run_external_aggr() {
    # Use TPC-H SF1 dataset
    TPCH_DIR="${DATA_DIR}/tpch_sf1"
    RESULTS_FILE="${RESULTS_DIR}/external_aggr.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running external aggregation benchmark..."

    # Only parquet is supported.
    # Since per-operator memory limit is calculated as (total-memory-limit /
    # number-of-partitions), and by default `--partitions` is set to number of
    # CPU cores, we set a constant number of partitions to prevent this
    # benchmark to fail on some machines.
    debug_run $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the sort integration benchmark
run_sort_tpch() {
    SCALE_FACTOR=$1
    if [ -z "$SCALE_FACTOR" ] ; then
        echo "Internal error: Scale factor not specified"
        exit 1
    fi
    TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
    RESULTS_FILE="${RESULTS_DIR}/sort_tpch${SCALE_FACTOR}.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running sort tpch benchmark..."

    debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the sort tpch integration benchmark with limit 100 (topk)
run_topk_tpch() {
    TPCH_DIR="${DATA_DIR}/tpch_sf1"
    RESULTS_FILE="${RESULTS_DIR}/run_topk_tpch.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running topk tpch benchmark..."

    $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
}

# Runs the nlj benchmark
run_nlj() {
    RESULTS_FILE="${RESULTS_DIR}/nlj.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running nlj benchmark..."
    debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
}

# Runs the hj benchmark
run_hj() {
    RESULTS_FILE="${RESULTS_DIR}/hj.json"
    echo "RESULTS_FILE: ${RESULTS_FILE}"
    echo "Running hj benchmark..."
    debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
}


compare_benchmarks() {
    BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
    BRANCH1="$1"
    BRANCH2="$2"
    OPTS="$3"

    if [ -z "$BRANCH1" ] ; then
        echo "<branch1> not specified. Available branches:"
        ls -1 "${BASE_RESULTS_DIR}"
        exit 1
    fi

    if [ -z "$BRANCH2" ] ; then
        echo "<branch2> not specified"
        ls -1 "${BASE_RESULTS_DIR}"
        exit 1
    fi

    echo "Comparing ${BRANCH1} and ${BRANCH2}"
    for RESULTS_FILE1 in "${BASE_RESULTS_DIR}/${BRANCH1}"/*.json ; do
	BENCH=$(basename "${RESULTS_FILE1}")
        RESULTS_FILE2="${BASE_RESULTS_DIR}/${BRANCH2}/${BENCH}"
        if test -f "${RESULTS_FILE2}" ; then
            echo "--------------------"
            echo "Benchmark ${BENCH}"
            echo "--------------------"
            PATH=$VIRTUAL_ENV/bin:$PATH python3 "${SCRIPT_DIR}"/compare.py $OPTS "${RESULTS_FILE1}" "${RESULTS_FILE2}"
        else
            echo "Note: Skipping ${RESULTS_FILE1} as ${RESULTS_FILE2} does not exist"
        fi
    done

}

setup_venv() {
    python3 -m venv "$VIRTUAL_ENV"
    PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
}

# And start the process up
main
